use tremor_value::{Value, Object, StaticNode, to_value, Error};
use crate::simd_json::{Mutable,ValueAccess};
use crate::data_frame::value::{RealValue, RealValueType};

fn add_object(k: &[String], v: Value<'static>) ->Box<Object<'static>>{
    if k.is_empty(){
        error!("add_object k len cannot be zero");
        std::process::exit(-1);
    }
    if k.len() == 1{
        let k: String = (&k[0]).clone();
        let mut obj = Object::new();
        obj.insert(k.into(), v);
        return Box::new(obj);
    }

    let ik: String = (&k[0]).clone();
    let mut obj = Object::new();
    obj.insert(ik.into(), Value::Object(add_object(&k[1..], v)));
    return Box::new(obj);
}
pub fn set_key_value<'a,'b>(key: &'a [String], v: Value<'a>, value: &'a mut Value<'b>)->bool{
    if key.is_empty(){
        println!("set_key_value key is empty");
        return false;
    }
    if key.len() == 1{
        let k: String = (&key[0]).clone();
        match value.insert(k, v.clone_static()){
            Ok(_) => {return true;},
            Err(e) => {
                println!("set_key_value1 err {:?}", e);
                return false;
            },
        }
    }else{
        let k: String = (&key[0]).clone();
        let obj = add_object(&key[1..], v.clone_static());
        match value.insert(k, Value::Object(obj)){
            Ok(_) => {return true;},
            Err(e) => {
                println!("set_key_value2 err {:?}", e);
                return false;
            },
        }
    }
}
fn get_object(k: &[String], obj: &Object)->Option<RealValue>{
    if k.is_empty(){
        error!("get_object k len cannot be zero");
        std::process::exit(-1);
    }
    if k.len() == 1{
        let k: &str = &k[0];
        let v = obj.get(k)?;
        return value_to_realvalue(v);
    }

    let kk: &str = &k[0];
    let v = obj.get(kk)?;
    match v{
        Value::Object(_v) => {
            return get_object(&k[1..], obj);
        },
        other => {
            return value_to_realvalue(other);
        },
    }
}
/*
暂时不支持Array类型的json值查询
暂时不支持Object类型的json值查询，这个会比较复杂。但是允许删除某个Object
*/
fn value_to_realvalue(v: &Value)->Option<RealValue>{
    match v{
        Value::Object(_) => {return None;},
        Value::Static(v) => {
            match v{
                StaticNode::U64(v) => {return Some(RealValue::U64(*v));},
                StaticNode::I64(v) => {return Some(RealValue::I64(*v));},
                StaticNode::F64(v) => {return Some(RealValue::F64(*v));},
                StaticNode::Bool(v) => {return Some(RealValue::Boolean(*v));},
                StaticNode::Null => {return None;},
            }
        },
        Value::String(v) => {
            return Some(RealValue::Utf8(v.to_string()));
        },
        Value::Bytes(v) => {
            return Some(RealValue::Bytes(v.to_vec()));
        },
        Value::Array(_v) => {
            return None;
        },
    }
}
pub fn get_key_value<'a,'b,'c>(k: &'a [String], value: &'b Value<'c>)->Option<RealValue>{
    if k.is_empty(){
        return None;
    }

    if k.len() == 1{
        let kk: &str = &k[0];
        let v = value.get(kk)?;
        return value_to_realvalue(v);
    }

    let kk: &str = &k[0];
    let v = value.get(kk)?;

    match v{
        Value::Object(v) => {return get_object(&k[1..], v);},
        other => {return value_to_realvalue(other);},
    }
}

pub fn delete_key<'a,'b,'c>(k: &'a [String], value: &'b mut Value<'c>)->bool{
    if k.is_empty(){
        return true;
    }

    if k.len() == 1{
        let kk: &str = &k[0];
        match value.remove(kk){
            Ok(_) => {return true;},
            Err(_) => {return false;},
        }
    }else{
        let kk: &str = &k[0];
        let res = value.get_mut(kk);
        if let Some(value) = res{
            return delete_key(&k[1..], value);
        }else{
            return false;
        }
    }
}

pub fn key_exist<'a,'b,'c>(k: &'a [String], value: &'b Value<'c>)->bool{
    if k.is_empty(){
        return false;
    }

    if k.len() == 1{
        let kk: &str = &k[0];
        return value.contains_key(kk);
    }else{
        let kk: &str = &k[0];
        let res = value.get(kk);
        if let Some(value) = res{
            return key_exist(&k[1..], value);
        }else{
            return false;
        }
    }
}

fn string_data_convert(input: String, new_type: Option<RealValueType>)->Result<Value<'static>, Error> {
    match new_type {
        Some(t) => {
            match t {
                RealValueType::U8 => {to_value(input.parse::<u8>().map_err(|e| Error::from(e.to_string().as_str()))?)}
                RealValueType::U16 => {to_value(input.parse::<u16>().map_err(|e| Error::from(e.to_string().as_str()))?)}
                RealValueType::U32 => {to_value(input.parse::<u32>().map_err(|e| Error::from(e.to_string().as_str()))?)}
                RealValueType::U64 => {to_value(input.parse::<u64>().map_err(|e| Error::from(e.to_string().as_str()))?)}
                RealValueType::I8 => {to_value(input.parse::<i8>().map_err(|e| Error::from(e.to_string().as_str()))?)}
                RealValueType::I16 => {to_value(input.parse::<i16>().map_err(|e| Error::from(e.to_string().as_str()))?)}
                RealValueType::I32 => {to_value(input.parse::<i32>().map_err(|e| Error::from(e.to_string().as_str()))?)}
                RealValueType::I64 => {to_value(input.parse::<i64>().map_err(|e| Error::from(e.to_string().as_str()))?)}
                RealValueType::F32 => {to_value(input.parse::<f32>().map_err(|e| Error::from(e.to_string().as_str()))?)}
                RealValueType::F64 => {to_value(input.parse::<f64>().map_err(|e| Error::from(e.to_string().as_str()))?)}
                RealValueType::Utf8 => {to_value(input)}
                RealValueType::Boolean => {
                    match input.to_lowercase().as_str() {
                        "true" | "t" | "yes" | "y" => to_value(true),
                        "false" | "f" | "no" | "n" | "0" => to_value(false),
                        _ => {Err(Error::from("string_data_convert 0 err, unknown convert type"))}
                    }
                }
                _ => {Err(Error::from("string_data_convert 1 err, unknown convert type"))}
            }
        }
        None => {Err(Error::from("string_data_convert 2 err, unknown convert type"))}
    }
}

fn static_data_convert(s: &StaticNode, new_type: Option<RealValueType>)->Result<Value<'static>, Error> {
    match new_type {
        Some(t) => {
            match t {
                RealValueType::U8 => {to_value(s.as_u64().map(|s| s as u8))}
                RealValueType::U16 => {to_value(s.as_u64().map(|s| s as u16))}
                RealValueType::U32 => {to_value(s.as_u64().map(|s| s as u32))}
                RealValueType::U64 => {to_value(s.as_u64())}
                RealValueType::I8 => {to_value(s.as_i64().map(|s| s as i8))}
                RealValueType::I16 => {to_value(s.as_i64().map(|s| s as i16))}
                RealValueType::I32 => {to_value(s.as_i64().map(|s| s as i32))}
                RealValueType::I64 => {to_value(s.as_i64())}
                RealValueType::F32 => {to_value(s.as_f64().map(|s| s as f32))}
                RealValueType::F64 => {to_value(s.as_f64())}
                RealValueType::Utf8 => {to_value(s.to_string())}
                RealValueType::Boolean => {to_value(s.as_i64().map(|s| s!=0))}
                _ => {Err(Error::from("static_data_convert 0 err, unknown convert type"))}
            }
        }
        None => {Err(Error::from("static_data_convert 1 err, unknown convert type"))}
    }
}

pub fn convert_value<'a,'b,'c>(key: &'a [String], new_type: Option<RealValueType>, value: &'b mut Value<'c>)->Result<(),()>{
    if key.is_empty() {
        return Err(());
    }
    if key.len() == 1 {
        let k: &str = &key[0];
        let res = value.get(k);
        if let Some(tmp) = res {
            let new_value = match tmp {
                Value::Static(s) => {static_data_convert(s, new_type)},
                Value::String(s) => {string_data_convert(s.to_string(), new_type)},
                _ => {
                    return Err(());
                },
            };

            let new_key = (&key[0]).clone();
            match new_value {
                Ok(data) => {
                    if data == Value::Static(StaticNode::Null) {
                        return Err(());
                    }
                    //value.remove(k);
                    match value.insert(new_key, data) {
                        Ok(_) => {return Ok(());},
                        Err (_) => {return Err(());},
                    }
                },
                Err(_) => {
                    return Err(());
                }
            }
        } else {
            return Err(());
        }
    }else{
        let k: &str = &key[0];
        let res = value.get_mut(k);
        if let Some(value) = res {
            return convert_value(&key[1..], new_type, value);
        }else{
            return Err(());
        }
    }
}

pub fn encode_value<'a,'b,'c>(key: &'a [String], value: &'b mut Value<'c>){
    if key.is_empty(){
        return;
    }

    if key.len() == 1{
        let k: &str = &key[0];
        let res = value.get(k);
        if let Some(tmp) = res {
            let new_value = match tmp {
                Value::Array(s) => {
                    let mut res_value = String::new();
                    for item in s {
                        if let Value::String(a) = item {
                            if res_value.len() != 0 {
                                res_value.push(',');
                            }
                            res_value.push_str(a);
                        } else {
                            return
                        }
                    }
                    res_value
                }
                _ => {
                    return;
                },
            };

            if new_value == "" {
                return;
            }
            let new_key = (&key[0]).clone();
            let _ = value.insert(new_key, new_value);
        } else {
            return ;
        }
    }else{
        let kk: &str = &key[0];
        let res = value.get_mut(kk);
        if let Some(value) = res{
            encode_value(&key[1..], value);
            return;
        }
    }
}

pub fn decode_value<'a,'b,'c>(key: &'a [String], value: &'b mut Value<'c>){
    if key.is_empty(){
        return;
    }

    if key.len() == 1{
        let k: &str = &key[0];
        let res = value.get(k);
        if let Some(tmp) = res {
            let new_value = match tmp {
                Value::String(s) => {
                    let mut res_value = Value::array();
                    for item in s.split(',') {
                        let _ = res_value.push(item.to_string());
                    }
                    res_value
                }
                _ => {
                    return;
                },
            };

            let new_key = (&key[0]).clone();
            let _ = value.insert(new_key, new_value);
        } else {
            return ;
        }
    }else{
        let kk: &str = &key[0];
        let res = value.get_mut(kk);
        if let Some(value) = res{
            decode_value(&key[1..], value);
            return;
        }
    }
}
